package org.snake.nebulae.core.task;

import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.snake.nebulae.core.model.ServiceInfo;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;

@Slf4j
public class ClearSercieTask implements Runnable {

    /**
     * 初始化标记
     */
    private boolean init = false;

    /**
     * 核心任务
     */
    private CoreTask coreTask;

    /**
     * 清理任务
     */
    private ClearSercieTask self;

    /**
     * 线程池
     */
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    /**
     * 扩展点
     */
    private CompletableFuture<CoreTask> pluginPoint;

    /**
     * 定时延迟时间
     */
    private int delayTime = 5000;

    /**
     * 服务超时时间
     */
    private int timeout = 60000;

    public ClearSercieTask(CompletableFuture<CoreTask> pluginPoint, ThreadPoolTaskScheduler threadPoolTaskScheduler) {
        this.pluginPoint = pluginPoint;
        this.threadPoolTaskScheduler = threadPoolTaskScheduler;
        this.self = this;

        Consumer<CoreTask> doInit = coreTask -> {
            // 如果没有初始化过那么把自己注册进线程池中执行
            if (!init) {
                this.coreTask = coreTask;

                this.threadPoolTaskScheduler.scheduleWithFixedDelay(self, delayTime);
            }
        };

        // 接受处理
        this.pluginPoint.thenAcceptAsync(doInit);
    }

    @Override
    public void run() {
        long startTime = System.currentTimeMillis();
        log.info("{} 开始定时清理过期服务/失效服务 时间是 {}", self.getClass(), DateUtil.now());

        ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>> services = coreTask.getServices();

        if (services.size() == 0) {
            log.error("{} 定时清理过期任务/失效服务失败 时间是 {} 原因是 {}", self.getClass(), DateUtil.now(), "当前没有服务");
            return;
        }

        services.forEach((name, serviceList) -> {

            // 清理失效/过期具体方法实例
            serviceList.forEach((methodName, serviceLink) -> {

                ConcurrentLinkedQueue<ServiceInfo> removeService = new ConcurrentLinkedQueue<>();

                // 收集所有需要移除的服务
                serviceLink.forEach(serviceInfo -> {
                    long diffTime = System.currentTimeMillis() - serviceInfo.getLastHeartBeatTime();
                    if (diffTime > timeout) {
                        removeService.add(serviceInfo);
                    }
                });

                if (removeService.size() > 0) {
                    serviceLink.removeAll(removeService);
                    log.info("{} 定时清理过期任务/失效服务成功 时间是 {} 条数 {}", self.getClass(), DateUtil.now(), removeService.size());
                } else {
                    log.info("{} 定时清理过期任务/失效服务失败 时间是 {} 原因是 {}", self.getClass(), DateUtil.now(), "没有符合相关的条数");
                }

                if (serviceLink.size() == 0) {
                    serviceList.remove(methodName);
                }
            });

            // 清理没有方法实例
            if (serviceList.size() == 0) {
                services.remove(name);
            }
        });
        log.info("{} 本次清理过期任务/失效服务完成 总共花费时间 {}", self.getClass(), (System.currentTimeMillis() - startTime));
    }
}
